package spark.structed_streaming

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.{StringType, StructField, StructType}

object SparkStructed02 {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession
      .builder()
      .master("local[*]")
      .appName("StructedStreaming")
      .getOrCreate()

    val socolSchema = StructType(List(
      StructField("cmd_pic_count",StringType,true),
      StructField("cmd_type",StringType,true),
      StructField("content",StringType,true),
      StructField("create_time",StringType,true),
      StructField("device_no",StringType,true),
      StructField("id",StringType,true),
      StructField("imei",StringType,true),
      StructField("isDeleted",StringType,true),
      StructField("mg_ack_state",StringType,true),
      StructField("mg_ack_time",StringType,true),
      StructField("request_pic_count",StringType,true),
      StructField("result_report_time",StringType,true),
      StructField("return_pic_count",StringType,true),
      StructField("send_time",StringType,true),
      StructField("seq",StringType,true),
      StructField("source",StringType,true),
      StructField("state",StringType,true),
      StructField("third_ack_state",StringType,true),
      StructField("third_ack_time",StringType,true),
      StructField("ts",StringType,true),
      StructField("update_time",StringType,true)
    ))

    import spark.implicits._
    import org.apache.spark.sql.functions._
//    val lines = spark
//      .readStream
//      .format("json")
//      .schema(socolSchema)
//      .load("/Users/tanweihua/Desktop/socol_mock_data/")

    val nestTimestampFormat = "yyyy-MM-dd'T'HH:mm:ss.sss'Z'"
    val jsonOptions =  Map("timestampFormat" -> nestTimestampFormat)

    var lines = spark
      .readStream
      .format("kafka")
      // .schema(socolSchema) // 读取本地json模式下，需要传schema。读取kafka数据，需要按当前模式
      .option("kafka.bootstrap.servers", "hadoop8:9092,hadoop9:9092,hadoop10:9092")
      .option("subscribe", "socol_cmdinfo")
      .load()
      .select(from_json(col("value").cast("string"), socolSchema, jsonOptions).alias("parsed_value"))

    lines = lines.select("parsed_value.*")

    val query = lines
      .writeStream
      .outputMode("append")
      .trigger(Trigger.ProcessingTime(0))
      .format("console")
      .start()

    query.awaitTermination()
  }
}
